package com.california.pay.socket;

import com.bwton.socket.exception.SocketAbstractException;
import com.bwton.socket.exception.SocketErrorMsgConstant;
import com.bwton.socket.exception.SocketNetworkException;
import com.bwton.socket.exception.SocketServiceException;
import com.bwton.socket.log.LoggerUtil;
import com.bwton.socket.transport.URL;
import com.bwton.socket.transport.call.*;
import com.bwton.socket.util.RemotingUtil;
import com.california.pay.persist.mapper.MchPayQuotaMapper;
import com.california.pay.persist.mapper.TerminalLoginMapper;
import io.netty.channel.ChannelFuture;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;

import java.io.UnsupportedEncodingException;
import java.util.Map;
import java.util.concurrent.*;

import static com.bwton.socket.common.SocketConstants.NETTY_TIMEOUT_TIMER_PERIOD;

@Slf4j
public class ProviderClientService {
    @Autowired
    private MchPayQuotaMapper quotaMapper;
    @Autowired
    private TerminalLoginMapper loginMapper;

    /**
     * 回收过期任务
     */
    private static ScheduledExecutorService scheduledExecutor = Executors.newScheduledThreadPool(4);


    /**
     * 异步的request，需要注册callback future
     * 触发remove的操作有： 1) service的返回结果处理。 2) timeout thread cancel
     */
    private ConcurrentMap<String, ResponseFuture> callbackMap = new ConcurrentHashMap<>();

    private ScheduledFuture<?> timeMonitorFuture = null;

    private URL serverUrl;

    public ProviderClientService(){
        timeMonitorFuture = scheduledExecutor.scheduleWithFixedDelay(
                new TimeoutMonitor("timeout_monitor_thread____" ),
                NETTY_TIMEOUT_TIMER_PERIOD, NETTY_TIMEOUT_TIMER_PERIOD, TimeUnit.MILLISECONDS);
    }


    private void registerCallback(String requestId, ResponseFuture responseFuture) {
        if (this.callbackMap.size() >= 20000) {
            // reject request, prevent from OutOfMemoryError
            throw new SocketServiceException("NettyClient over of max concurrent request, drop request, requestId=" + requestId, SocketErrorMsgConstant.SERVICE_REJECT);
        }

        // 短时间内重复发送过滤
        if (this.callbackMap.containsKey(requestId)) {
            throw new SocketServiceException("NettyClient 已经包含该请求，正等待响应，请勿重新发送, requestId=" + requestId, SocketErrorMsgConstant.SERVICE_REJECT);
        }

        this.callbackMap.put(requestId, responseFuture);
    }

    public ResponseFuture removeCallback(String requestId) {
        return callbackMap.remove(requestId);
    }

    public Response request(Request request, int requestTimeout) {
        Response response;
        try {
            response = syncRequest(request, requestTimeout);
            SocketResponse socketResponse = new SocketResponse(response);
            //释放Response
            response.clear();
            return socketResponse;
        } catch (Exception e) {
            LoggerUtil.error(log, "请求异常: remoteAddress:" +RemotingUtil.parseLocalAddress(request.getNettyChannel()), e);
            if (e instanceof SocketAbstractException) {
                throw e;
            } else {
                throw new SocketServiceException("请求异常: remoteAddress:" +RemotingUtil.parseLocalAddress(request.getNettyChannel()), e);
            }
        }
    }

    public Response syncRequest(Request request, int requestTimeout) {
        ResponseFuture response = new SocketResponseFuture(request, requestTimeout, serverUrl);
        TransSocketMessage socketMessage = (TransSocketMessage)request.getTms();
        byte[] msg = this.encode(request);
        this.registerCallback(request.getRequestId(), response);

        // 连接不存在
/*        if (request.getNettyChannel() == null || !request.getNettyChannel().isActive()){
            quotaMapper.updateStatusByChannelMchId(Status.DISABLE, channelMchId, Long.parseLong(DateUtils.yyyyMMdd(new Date())));
            loginMapper.deleteByChannelMchId(channelMchId);
        }*/

        ChannelFuture writeFuture = request.getNettyChannel().writeAndFlush(msg);

        boolean result = writeFuture.awaitUninterruptibly(requestTimeout, TimeUnit.MILLISECONDS);

        // 数据发送完成
        if (result && writeFuture.isSuccess()) {
            response.addListener(future -> {
                if (future.isSuccess() || (future.isDone() && future.getException() != null)) {
                    log.info("服务端 >> 终端，发送消息成功，messageId={}", socketMessage.getSeq());
                } else {
                    log.info("服务端 >> 终端，发送消息失败，message={}", socketMessage.getSocketBody());
                }
            });
            return response;
        }

        writeFuture.cancel(true);
        response = this.removeCallback(request.getRequestId());
        if (response != null) {
            response.cancel();
        }

        if (writeFuture.cause() != null) {
            throw new SocketServiceException("NettyChannel 发送请求到服务端异常: address="+ RemotingUtil.parseRemoteAddress(request.getNettyChannel())+ ", cause=" +writeFuture.cause());
        } else {
            // 服务端可能收到请求
            throw new SocketNetworkException("NettyChannel 发送请求到服务端超时: address="+ RemotingUtil.parseRemoteAddress(request.getNettyChannel())+ ", cause=" +writeFuture.cause());
        }
    }

    public void setServerUrl(URL serverUrl) {
        this.serverUrl = serverUrl;
    }


    private byte[] encode( Request request) {
        SocketMessage socketMessage = request.getTms();

        TransSocketMessage transSocketMessage = (TransSocketMessage)socketMessage;
        String socketBody = transSocketMessage.getSocketBody();

        if (StringUtils.isNotBlank(socketBody)){
            try {
                return socketBody.getBytes("utf-8");
            } catch (UnsupportedEncodingException e) {
                e.printStackTrace();
            }
        }

        return null;
    }

    public void callbackRequest(Response response){
        ResponseFuture responseFuture = removeCallback(response.getRequestId());
        // future 为空，且解码不为空
        if (responseFuture == null ) {
            LoggerUtil.warn(log, "NettyClient 已接受服务端响应, 但responseFuture不存在(解码成功), requestId={}, serviceKey={}", response.getRequest(), response.getServiceKey());
            return;
        }
        if (log.isInfoEnabled()) {
            log.info("解码 > 发送数据 > 接受数据 > 解码花费时间:" + (System.currentTimeMillis() - responseFuture.getCreateTime()));
        }
        responseFuture.onSuccess(response);
    }

    /**
     * 回收超时任务
     */
    class TimeoutMonitor implements Runnable {
        private String name;

        public TimeoutMonitor(String name) {
            this.name = name;
        }

        @Override
        public void run() {
            long currentTime = System.currentTimeMillis();
            for (Map.Entry<String, ResponseFuture> entry : callbackMap.entrySet()) {
                try {
                    ResponseFuture future = entry.getValue();

                    // 超时：从callback移除，并取消操作,
                    if (future.getCreateTime() + future.getTimeout() < currentTime) {
                        removeCallback(entry.getKey());
                        LoggerUtil.warn(log, name + " 清除超时future:  requestId=" + entry.getKey());
                        future.cancel();
                    }
                } catch (Exception e) {
                    LoggerUtil.error(log, name + " 清除超时future异常: requestId=" + entry.getKey(), e);
                }
            }
        }
    }
}
